Spark-submit needs option

--packages com.databricks:spark-avro_2.10:2.0.1

In [1]:
# load a dataframe from Avro files
df = sqlContext.read.format("com.databricks.spark.avro").load("/cms/wmarchive/test/avro/2016/01/01/")

In [3]:
%%bash
hadoop fs -du -h -s /cms/wmarchive/test/avro/2016/01/01/


757.7 M  2.2 G  /cms/wmarchive/test/avro/2016/01/01

In [4]:
sqlContext.setConf("spark.sql.avro.compression.codec", "snappy")
df.write.format("com.databricks.spark.avro").save("wmarchive/test-avro-snappy-20160101")

In [5]:
%%bash
hadoop fs -du -h -s wmarchive/test-avro-snappy-20160101


188.0 M  564.1 M  wmarchive/test-avro-snappy-20160101

In [9]:
dfSnappy = sqlContext.read.format("com.databricks.spark.avro").load("wmarchive/test-avro-snappy-20160101")

In [12]:
%%time
df.count()


CPU times: user 5 ms, sys: 1 ms, total: 6 ms
Wall time: 8.6 s
Out[12]:
200000

In [13]:
%%time
dfSnappy.count()


CPU times: user 2 ms, sys: 2 ms, total: 4 ms
Wall time: 7.02 s
Out[13]:
200000

In [14]:
# GZIP
sqlContext.setConf("spark.sql.avro.compression.codec", "deflate")
df.write.format("com.databricks.spark.avro").save("wmarchive/test-avro-deflate-20160101")

In [1]:
%%bash
hadoop fs -du -h -s wmarchive/test-avro-deflate-20160101


125.9 M  377.8 M  wmarchive/test-avro-deflate-20160101

In [15]:
dfDeflate = sqlContext.read.format("com.databricks.spark.avro").load("wmarchive/test-avro-deflate-20160101")

In [16]:
%%time
dfDeflate.count()


CPU times: user 3 ms, sys: 2 ms, total: 5 ms
Wall time: 7.95 s
Out[16]:
200000

In [ ]: